Data Ingestion using Amazon Kinesis Data Streams with S3 Data Lake
In this topic we will describe the creation of a data ingestion pipeline using Amazon Kinesis Data Streams as a data source with Databricks for data integration and ingesting the data into an S3 data lake.
Prerequisites
-
Access to a configured Amazon S3 instance which will be used as a data lake in the pipeline.
-
A configured instance of Amazon Kinesis Data Streams. For information about configuring Kinesis, refer to the following topic: Configuring Amazon Kinesis Data Streams
Creating a data ingestion pipeline
- On the home page of Data Pipeline Studio, add the following stages and connect them as shown below:
- Data Source: Amazon Kinesis Data Streams
- Data Integration: Databricks
- Data Lake: Amazon S3
-
Configure the Kinesis node and Amazon S3 node.
-
Click the Databricks node and click Create Job.
Complete the following steps to create a data integration job:
Provide job details for the data integration job:
-
Template - Based on the source and destination that you choose in the data pipeline, the template is automatically selected.
-
Job Name - Provide a name for the data integration job.
-
Node rerun Attempts - Specify the number of times the job is rerun in case of failure. The default setting is done at the pipeline level.
-
Source - This is automatically selected depending on the source added in the pipeline.
-
Datastore - The datastore that you configured for the Kinesis node is auto-populated.
-
Stream Start Position - This is the position in the data stream from which to start streaming. You can choose from the following options:
-
latest - Start streaming after the most recent record in the shard, ensuring that you always read the most recent data.
-
trim_horizon - Start streaming from the last untrimmed record in the shard.
-
-
Target - This is automatically selected depending on the target added in the pipeline.
-
Datastore - The datastore that you configured for the Amazon S3 node is auto-populated.
-
Choose Target Format - Select the file format that you want to use to store data in for the target node. Select from the following options:
-
Parquet
-
JSON
-
-
Target Folder (Optional) - Select a target folder that is added to the S3 bucket. Click the pencil icon and browse to the folder. This is optional. If you do not select a folder, the data is stored at the root level in the S3 bucket.
Note:On selecting a target folder, you may see the following message: If you choose an existing folder for target path, then it may lead to job failure. To avoid this, choose a different folder.
In such a case, select a folder. This gets created inside the target folder.
-
Folder (Optional) - Provide a folder name. This folder gets created inside the target folder.
-
Operation Type - Choose the type of operation to perform on the data, from the following options:
-
Append - Adds new data at the end of the table without deleting the existing content.
-
Overwrite - Replaces the entire content of a table with new data.
-
-
Enable Partitioning - You can create partitions in S3 tables. Currently Data Pipeline Studio supports date-based partitioning. Enable the toggle for partitioning.
In Date Based Partitioning select one of the following options from the dropdown list:
-
Yearly - Select this option if you want to create partitions for each year. The year is added to the file path.
-
Monthly - Select this option if you want to create partitions for each month of the current year. The year and month is added to the file path.
-
Daily - Select this option if you want to created partitions for each day. The year, month, and day are added to the file path.
Add prefix for creating partition folder name - Provide a prefix to add to the folder name for creating the partition.
-
-
Final File Path - Depending on the option you choose for partitioning and the prefix you provide, the final file path is created.
In this step you map the columns from the target data to custom columns. This step helps you to edit the columns in the target. You can deselect the columns that you do not require and rename the columns are per your requirement.
Filter columns - From the list of columns that is populated, you select or deselect columns according to your use case and provide custom names to certain columns while mapping the columns. You can use the search option to find specific columns.
-
Default Column Name - You can view the default names of the columns.
-
Custom Column Name - You can map the columns from the target table to the default column names.
You can select an all-purpose cluster or a job cluster to run the configured job. In case your Databricks cluster is not created through the Lazsa Platform and you want to update custom environment variables, refer to the following:
Cluster - Select the all-purpose cluster that you want to use for the data integration job, from the dropdown list.
Cluster Details | Description |
---|---|
Choose Cluster | Provide a name for the job cluster that you want to create. |
Job Configuration Name | Provide a name for the job cluster configuration. |
Databricks Runtime Version | Select the appropriate Databricks version. |
Worker Type | Select the worker type for the job cluster. |
Workers |
Enter the number of workers to be used for running the job in the job cluster. You can either have a fixed number of workers or you can choose autoscaling. |
Enable Autoscaling | Autoscaling helps in scaling up or down the number of workers within the range specified by you. This helps in reallocating workers to a job during its compute-intensive phase. Once the compute requirement reduces the excess number of workers are removed. This helps control your resource costs. |
Cloud Infrastructure Details | |
First on Demand |
Provide the number of cluster nodes that are marked as first_on_demand. The first_on_demand nodes of the cluster are placed on on-demand instances. |
Availability |
Choose the type of EC2 instances to launch your Apache Spark clusters, from the following options:
|
Zone |
Identifier of the availability zone or data center in which the cluster resides. The provided availability zone must be in the same region as the Databricks deployment. |
Instance Profile ARN | Provide an instance profile ARN that can access the target Amazon S3 bucket. |
EBS Volume Type | The type of EBS volume that is launched with this cluster. |
EBS Volume Count | The number of volumes launched for each instance of the cluster. |
EBS Volume Size | The size of the EBS volume to be used for the cluster. |
Additional Details | |
Spark Config | To fine tune Spark jobs, provide custom Spark configuration properties in key value pairs. |
Environment Variables | Configure custom environment variables that you can use in init scripts. |
Logging Path (DBFS Only) | Provide the logging path to deliver the logs for the Spark jobs. |
Init Scripts | Provide the init or initialization scripts that run during the start up of each cluster. |
You can configure the SQS and SNS services to send notifications related to the node in this job. This provides information about various events related to the node without actually connecting to the Lazsa Platform.
SQS and SNS | |
---|---|
Configurations - Select an SQS or SNS configuration that is integrated with the Lazsa Platform. | |
Events - Enable the events for which you want to enable notifications:
|
|
Event Details - Select the details of the events from the dropdown list, that you want to include in the notifications. | |
Additional Parameters - Provide any additional parameters that are to be added in the SQS and SNS notifications. A sample JSON is provided, you can use this to write logic for processing the events. |
Running the data ingestion pipeline
After you have created the data integration job with Amazon Kinesis Data Streams, ensure that you publish the pipeline. If you haven't already done so, click Publish.you can run the pipeline in the following ways:
-
Click . The Data Streams window opens, which provides a list of the data streams in the pipeline. Enable the toggle for the stream that you want to use to fetch data.
-
Click the Databricks node and then click Start to run the data integration job. Navigate to Data Streams and click Refresh. The Kinesis Streaming is enabled.
You can see that the data stream that you enabled is now running. Click the refresh icon to view the latest information about number of events processed.
Troubleshooting a failed data integration job
When you click the Databricks node in the pipeline, you know if your data integration job has failed looking at the status of the job.
-
Click the Databricks node in the pipeline.
-
Check the status of the Databricks integration jobThe status could be one of the following:
-
Running
-
Canceled
-
Pending
-
Failed
-
-
If the job status is seen as Failed, click the (...) ellipsis and then click Open Databricks Dashboard.
-
You are navigated to the specific Databricks job. This shows the list of job runs. Click the job run for which you want to view the details.
-
View the details and check for errors.
What's next? Data Ingestion using Amazon Kinesis Data Streams with Snowflake Data Lake |